package com.sophie.chapter2;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerDefineSerializer {

    private static final String brokerList = "localhost:9092";

    private static final String topic = "heima";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        method1();
        method2();
    }

    public static void method1() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());

        KafkaProducer<String, Company> producer = new KafkaProducer<String, Company>(properties);

        Company company = Company.builder().name("kafka").address("长沙").build();
        ProducerRecord<String, Company> record = new ProducerRecord<>(topic, company);
        producer.send(record).get();
        producer.close();
    }

    public static void method2() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProducerDefineSerializer.class.getName());

        KafkaProducer<String, Company> producer = new KafkaProducer<String, Company>(properties);

        Company company = Company.builder().name("kafka").address("长沙").build();
        ProducerRecord<String, Company> record = new ProducerRecord<>(topic, company);
        producer.send(record).get();
        producer.close();
    }
}
